package com.data.dev.flink;

import com.data.dev.common.exception.AlarmException;
import com.data.dev.common.exception.ConfErrorCode;
import com.data.dev.common.javabean.BaseBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;

/**
 * @author wangxiaoming-ghq
 * 2022年6月17日15:29:22
 * Flink全局公共类，用于创建、执行会话
 */
@Slf4j
public class FlinkEnv extends BaseBean {

    /**
     * 2022年6月17日15:29:56
     * 初始化Flink执行环境
     * @return env:初始化后的执行环境
     */
    public static StreamExecutionEnvironment getFlinkEnv(){
        //① 创建Flink执行环境并设置checkpoint等必要的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));// 1 分钟一次CheckPoint
        //env.setParallelism(1);//设置并行度，可在任务提交时进行设定
        CheckpointConfig checkpointConf = env.getCheckpointConfig();
        checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// CheckPoint 语义 EXACTLY ONCE
        checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        return env;
    }

    /**
     * 提交作业到指定环境进行作业执行
     * 2022年6月17日15:30:47
     * @param env:Flink执行环境
     * @param jobName：Flink作业名称
     */
    public static  void envExec(StreamExecutionEnvironment env,String jobName){
        try {
            env.execute(jobName);
        } catch (Exception e) {
            e.printStackTrace();
            log.info("flink任务提交执行失败");
            throw AlarmException.asAlarmException(ConfErrorCode.FLINK_EXECUTE_FAIL, e);
        }

    }
}
